package org.codehaus.activemq.broker.impl;

import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.Broker;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.CapacityInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.IntResponseReceipt;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.ResponseReceipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.message.XATransactionInfo;
import org.codehaus.activemq.message.util.BoundedPacketQueue;
import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.util.IdGenerator;

public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
	private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
	private BrokerConnector brokerConnector;
	private TransportChannel channel;
	private ConnectionInfo connectionInfo;
	private IdGenerator packetIdGenerator;
	private SynchronizedBoolean closed;
	private Set activeConsumers;
	private CopyOnWriteArrayList consumers;
	private CopyOnWriteArrayList producers;
	private CopyOnWriteArrayList transactions;
	private CopyOnWriteArrayList xatransactions;
	private CopyOnWriteArrayList sessions;
	private boolean started;
	private boolean brokerConnection;
	private int capacity = 100;
	private SpooledBoundedPacketQueue spoolQueue;
	private boolean cleanedUp;

	public BrokerClientImpl() {
		this.packetIdGenerator = new IdGenerator();
		this.closed = new SynchronizedBoolean(false);
		this.activeConsumers = new HashSet();
		this.consumers = new CopyOnWriteArrayList();
		this.producers = new CopyOnWriteArrayList();
		this.transactions = new CopyOnWriteArrayList();
		this.xatransactions = new CopyOnWriteArrayList();
		this.sessions = new CopyOnWriteArrayList();
	}

	public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
		this.brokerConnector = brokerConnector;
		this.channel = channel;
		this.channel.setPacketListener(this);
		this.channel.setExceptionListener(this);
		log.trace("brokerConnectorConnector client initialized");
	}

	public void onException(JMSException jmsEx) {
		log.warn(this + " caught exception ", jmsEx);
		close();
	}

	public String toString() {
		String str = "brokerConnector-client: ";
		str = str + (this.connectionInfo == null ? "" : this.connectionInfo.getClientId());
		str = str + ": " + this.channel;
		return str;
	}

	public void dispatch(ActiveMQMessage message) {
		if (isSlowConsumer()) {
			if (this.spoolQueue == null) {
				log.warn("Connection: " + this.connectionInfo.getClientId() + " is a slow consumer");
				String spoolName = this.brokerConnector.getBrokerInfo().getBrokerName() + "_"
						+ this.connectionInfo.getClientId();
				try {
					this.spoolQueue = new SpooledBoundedPacketQueue(
							this.brokerConnector.getBrokerContainer().getBroker().getTempDir(), spoolName);

					BoundedPacketQueue bpq = this.spoolQueue;
					ThreadedExecutor exec = new ThreadedExecutor();
					exec.execute(new Runnable() {
						private final BoundedPacketQueue val$bpq = bpq;

						public void run() {
							while (!BrokerClientImpl.this.closed.get())
								try {
									Packet packet = this.val$bpq.dequeue();
								} catch (InterruptedException e) {
									BrokerClientImpl.log.warn("async dispatch got an interupt", e);
								} catch (JMSException e) {
									BrokerClientImpl.log.error("async dispatch got an problem", e);
								}
						}
					});
				} catch (IOException e) {
					log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
					close();
				} catch (InterruptedException e) {
					log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
					close();
				}
			}
			if (this.spoolQueue != null)
				try {
					this.spoolQueue.enqueue(message);
				} catch (JMSException e) {
					log.error("Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
							e);

					close();
				}
		} else {
			send(message);
		}
	}

	public boolean isBrokerConnection() {
		return this.brokerConnection;
	}

	public int getCapacity() {
		return this.capacity;
	}

	public String getClientID() {
		if (this.connectionInfo != null) {
			return this.connectionInfo.getClientId();
		}
		return null;
	}

	public TransportChannel getChannel() {
		return this.channel;
	}

	public boolean isSlowConsumer() {
		return this.capacity <= 20;
	}

	public void consume(Packet packet) {
		if (!this.closed.get()) {
			Throwable requestEx = null;
			boolean failed = false;
			try {
				if (packet.isJMSMessage()) {
					ActiveMQMessage message = (ActiveMQMessage) packet;

					if (this.connectionInfo != null) {
						message.setProducerID(this.connectionInfo.getClientId());
					} else {
						log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
					}
					consumeActiveMQMessage(message);
				} else {
					switch (packet.getPacketType()) {
					case 15:
						MessageAck ack = (MessageAck) packet;
						consumeMessageAck(ack);
						break;
					case 20:
						XATransactionInfo info = (XATransactionInfo) packet;
						consumeXATransactionInfo(info);
						break;
					case 19:
						TransactionInfo info19 = (TransactionInfo) packet;
						consumeTransactionInfo(info19);
						break;
					case 17:
						ConsumerInfo info17 = (ConsumerInfo) packet;
						consumeConsumerInfo(info17);
						break;
					case 18:
						ProducerInfo info18 = (ProducerInfo) packet;
						consumeProducerInfo(info18);
						break;
					case 23:
						SessionInfo info23 = (SessionInfo) packet;
						consumeSessionInfo(info23);
						break;
					case 22:
						ConnectionInfo info22 = (ConnectionInfo) packet;
						consumeConnectionInfo(info22);
						break;
					case 24:
						DurableUnsubscribe ds = (DurableUnsubscribe) packet;
						this.brokerConnector.durableUnsubscribe(this, ds);
						break;
					case 27:
						CapacityInfo info27 = (CapacityInfo) packet;
						consumeCapacityInfo(info27);
						break;
					case 28:
						updateCapacityInfo(packet.getId());
						break;
					case 16:
					case 21:
					case 25:
					case 26:
					default:
						log.warn("Unknown Packet received: " + packet);
					}
				}

			} catch (Throwable e) {
				requestEx = e;
				e.printStackTrace();
				log.info("caught exception consuming packet: " + packet, e);
				failed = true;
			}
			sendReceipt(packet, requestEx, failed);
		}
	}

	public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
		if (info.isStarted()) {
			this.consumers.add(info);
			if ((this.connectionInfo != null) && (this.connectionInfo.isStarted())
					&& (this.activeConsumers.add(info))) {
				this.brokerConnector.registerMessageConsumer(this, info);
			}
		} else {
			this.consumers.remove(info);
			if (this.activeConsumers.remove(info))
				this.brokerConnector.deregisterMessageConsumer(this, info);
		}
	}

	public void updateBrokerCapacity(int capacity) {
		CapacityInfo info = new CapacityInfo();
		info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
		info.setCapacity(capacity);
		info.setFlowControlTimeout(getFlowControlTimeout(capacity));
		send(info);
	}

	public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
		this.connectionInfo = info;
		Iterator i;
		if (info.isClosed()) {
			cleanUp();
			try {
				sendReceipt(info);
				info.setReceiptRequired(false);
				try {
					Thread.sleep(500L);
				} catch (Throwable e) {
				}
			} finally {
				close();
			}
		} else {
			//Iterator i0;
			if ((!this.started) && (info.isStarted())) {
				this.started = true;
				if (log.isDebugEnabled()) {
					log.debug(this + " has started");
				}
				this.brokerConnector.registerClient(this, info);

				for (i = this.consumers.iterator(); i.hasNext();) {
					ConsumerInfo ci = (ConsumerInfo) i.next();
					ci.setClientId(info.getClientId());
				}
				for (i = this.producers.iterator(); i.hasNext();) {
					ProducerInfo pi = (ProducerInfo) i.next();
					pi.setClientId(info.getClientId());
				}
			}
			//Iterator i;
			if (info.isStarted()) {
				for (i = this.consumers.iterator(); i.hasNext();) {
					ConsumerInfo ci = (ConsumerInfo) i.next();
					if (this.activeConsumers.add(ci))
						this.brokerConnector.registerMessageConsumer(this, ci);
				}
			} else {
				log.debug(this + " has stopped");

				for (i = this.consumers.iterator(); i.hasNext();) {
					ConsumerInfo ci = (ConsumerInfo) i.next();
					if (this.activeConsumers.remove(ci))
						this.brokerConnector.deregisterMessageConsumer(this, ci);
				}
			}
		}
	}

	public void start() throws JMSException {
		this.channel.start();
	}

	public void stop() throws JMSException {
		log.trace("Stopping channel: " + this.channel);
		this.channel.stop();
	}

	public synchronized void cleanUp() {
		if (!this.cleanedUp) {
			this.cleanedUp = true;
			try {
				try {
					Iterator i;
					for (i = this.consumers.iterator(); i.hasNext();) {
						ConsumerInfo info = (ConsumerInfo) i.next();
						info.setStarted(false);
						this.brokerConnector.deregisterMessageConsumer(this, info);
					}
					for (i = this.producers.iterator(); i.hasNext();) {
						ProducerInfo info = (ProducerInfo) i.next();
						info.setStarted(false);
						this.brokerConnector.deregisterMessageProducer(this, info);
					}
					for (i = this.sessions.iterator(); i.hasNext();) {
						SessionInfo info = (SessionInfo) i.next();
						info.setStarted(false);
						this.brokerConnector.deregisterSession(this, info);
					}
					for (i = this.transactions.iterator(); i.hasNext();) {
						this.brokerConnector.rollbackTransaction(this, i.next().toString());
					}
					for (i = this.xatransactions.iterator(); i.hasNext();)
						try {
							this.brokerConnector.rollbackTransaction(this, (ActiveMQXid) i.next());
						} catch (XAException e) {
							log.warn("Transaction rollback failed:", e);
						}
				} finally {
					Iterator i;
					if (log.isDebugEnabled()) {
						log.info(this + " has stopped");
					}
					this.consumers.clear();
					this.producers.clear();
					this.transactions.clear();
					this.xatransactions.clear();
					this.sessions.clear();
					this.brokerConnector.deregisterClient(this, this.connectionInfo);
				}
			} catch (JMSException e) {
				log.warn("failed to de-register Broker client: " + e, e);
			}
		} else {
			log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
		}
	}

	protected void send(Packet packet) {
		if (!this.closed.get())
			try {
				this.channel.asyncSend(packet);
			} catch (JMSException e) {
				log.warn(this + " caught exception ", e);
				close();
			}
	}

	protected void close() {
		if (this.closed.commit(false, true))
			this.channel.stop();
	}

	private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
		if (message.isPartOfTransaction()) {
			this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
		} else
			this.brokerConnector.sendMessage(this, message);
	}

	private void consumeMessageAck(MessageAck ack) throws JMSException {
		if (ack.isPartOfTransaction()) {
			this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
		} else
			this.brokerConnector.acknowledgeMessage(this, ack);
	}

	private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
		if (info.getType() == 101) {
			this.transactions.add(info.getTransactionId());
			this.brokerConnector.startTransaction(this, info.getTransactionId());
		} else {
			if (info.getType() == 105) {
				this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
			} else if (info.getType() == 103) {
				this.brokerConnector.commitTransaction(this, info.getTransactionId());
			}
			this.transactions.remove(info.getTransactionId());
		}
	}

	private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
		if (info.getType() == 101) {
			this.transactions.add(info.getXid());
			this.brokerConnector.startTransaction(this, info.getXid());
		} else if (info.getType() == 110) {
			ActiveMQXid[] rc = this.brokerConnector.getPreparedTransactions(this);

			info.setReceiptRequired(false);

			ResponseReceipt receipt = new ResponseReceipt();
			receipt.setId(this.packetIdGenerator.generateId());
			receipt.setCorrelationId(info.getId());
			receipt.setResult(rc);
			send(receipt);
		} else if (info.getType() == 113) {
			String rc = this.brokerConnector.getResourceManagerId(this);

			info.setReceiptRequired(false);

			ResponseReceipt receipt = new ResponseReceipt();
			receipt.setId(this.packetIdGenerator.generateId());
			receipt.setCorrelationId(info.getId());
			receipt.setResult(rc);
			send(receipt);
		} else if (info.getType() != 106) {
			if (info.getType() == 102) {
				int rc = this.brokerConnector.prepareTransaction(this, info.getXid());

				info.setReceiptRequired(false);

				IntResponseReceipt receipt = new IntResponseReceipt();
				receipt.setId(this.packetIdGenerator.generateId());
				receipt.setCorrelationId(info.getId());
				receipt.setResult(rc);
				send(receipt);
			} else if (info.getType() == 105) {
				this.brokerConnector.rollbackTransaction(this, info.getXid());
			} else if (info.getType() == 109) {
				this.brokerConnector.commitTransaction(this, info.getXid(), true);
			} else if (info.getType() == 103) {
				this.brokerConnector.commitTransaction(this, info.getXid(), false);
			} else {
				throw new JMSException("Packet type: " + info.getType() + " not recognized.");
			}
			this.transactions.remove(info.getXid());
		}
	}

	private void consumeProducerInfo(ProducerInfo info) throws JMSException {
		if (info.isStarted()) {
			this.producers.add(info);
			this.brokerConnector.registerMessageProducer(this, info);
		} else {
			this.producers.remove(info);
			this.brokerConnector.deregisterMessageProducer(this, info);
		}
	}

	private void consumeSessionInfo(SessionInfo info) throws JMSException {
		if (info.isStarted()) {
			this.sessions.add(info);
			this.brokerConnector.registerSession(this, info);
		} else {
			this.sessions.remove(info);
			this.brokerConnector.deregisterSession(this, info);
		}
	}

	private void consumeCapacityInfo(CapacityInfo info) {
		this.capacity = info.getCapacity();
	}

	private void updateCapacityInfo(String correlationId) {
		CapacityInfo info = new CapacityInfo();
		info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
		info.setCorrelationId(correlationId);
		info.setCapacity(this.brokerConnector.getBrokerCapacity());
		info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
		send(info);
	}

	private long getFlowControlTimeout(int capacity) {
		long result = -1L;
		if (capacity <= 0) {
			result = 10000L;
		} else if (capacity <= 10) {
			result = 1000L;
		} else if (capacity <= 20) {
			result = 10L;
		}
		return result;
	}

	private void sendReceipt(Packet packet) {
		sendReceipt(packet, null, false);
	}

	private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
		if (packet.isReceiptRequired()) {
			Receipt receipt = new Receipt();
			receipt.setId(this.packetIdGenerator.generateId());
			receipt.setCorrelationId(packet.getId());
			receipt.setException(requestEx);
			receipt.setFailed(failed);
			send(receipt);
		}
	}
}